from __future__ import annotations

import abc
from contextlib import contextmanager
import contextvars
import functools
import heapq
import logging
import re
from typing import Any

import voluptuous as vol

from esphome import core, loader, pins, yaml_util
from esphome.config_helpers import Extend, Remove
import esphome.config_validation as cv
from esphome.const import (
    CONF_ESPHOME,
    CONF_EXTERNAL_COMPONENTS,
    CONF_ID,
    CONF_MIN_VERSION,
    CONF_PACKAGES,
    CONF_PLATFORM,
    CONF_SUBSTITUTIONS,
)
from esphome.core import CORE, DocumentRange, EsphomeError
import esphome.core.config as core_config
import esphome.final_validate as fv
from esphome.helpers import indent
from esphome.loader import ComponentManifest, get_component, get_platform
from esphome.log import AnsiFore, color
from esphome.types import ConfigFragmentType, ConfigType
from esphome.util import OrderedDict, safe_print
from esphome.voluptuous_schema import ExtraKeysInvalid
from esphome.yaml_util import ESPForceValue, ESPHomeDataBase, is_secret

_LOGGER = logging.getLogger(__name__)


def iter_components(config):
    for domain, conf in config.items():
        component = get_component(domain)
        yield domain, component
        if component.is_platform_component:
            for p_config in conf:
                p_name = f"{domain}.{p_config[CONF_PLATFORM]}"
                platform = get_platform(domain, p_config[CONF_PLATFORM])
                yield p_name, platform


def iter_component_configs(config):
    for domain, conf in config.items():
        component = get_component(domain)
        if component.multi_conf:
            for conf_ in conf:
                yield domain, component, conf_
        else:
            yield domain, component, conf
        if component.is_platform_component:
            for p_config in conf:
                p_name = f"{domain}.{p_config[CONF_PLATFORM]}"
                platform = get_platform(domain, p_config[CONF_PLATFORM])
                yield p_name, platform, p_config


ConfigPath = list[str | int]
path_context = contextvars.ContextVar("Config path")


def _process_platform_config(
    result: Config,
    component_name: str,
    platform_name: str,
    platform_config: ConfigType,
    path: ConfigPath,
) -> None:
    """Process a platform configuration and add necessary validation steps.

    This is shared between LoadValidationStep and AutoLoadValidationStep to avoid duplication.
    """
    # Get the platform manifest
    platform = get_platform(component_name, platform_name)
    if platform is None:
        result.add_str_error(
            f"Platform not found: '{component_name}.{platform_name}'", path
        )
        return

    # Add platform to loaded integrations
    CORE.loaded_integrations.add(platform_name)
    CORE.loaded_platforms.add(f"{component_name}/{platform_name}")

    # Process platform's AUTO_LOAD
    for load in platform.auto_load:
        if load not in result:
            result.add_validation_step(AutoLoadValidationStep(load))

    # Add validation steps for the platform
    p_domain = f"{component_name}.{platform_name}"
    result.add_output_path(path, p_domain)
    result.add_validation_step(
        MetadataValidationStep(path, p_domain, platform_config, platform)
    )


def _path_begins_with(path: ConfigPath, other: ConfigPath) -> bool:
    if len(path) < len(other):
        return False
    return path[: len(other)] == other


@functools.total_ordering
class _ValidationStepTask:
    def __init__(self, priority: float, id_number: int, step: ConfigValidationStep):
        self.priority = priority
        self.id_number = id_number
        self.step = step

    @property
    def _cmp_tuple(self) -> tuple[float, int]:
        return (-self.priority, self.id_number)

    def __eq__(self, other):
        return self._cmp_tuple == other._cmp_tuple

    def __ne__(self, other):
        return not (self == other)

    def __lt__(self, other):
        return self._cmp_tuple < other._cmp_tuple


class Config(OrderedDict, fv.FinalValidateConfig):
    def __init__(self):
        super().__init__()
        # A list of voluptuous errors
        self.errors: list[vol.Invalid] = []
        # A list of paths that should be fully outputted
        # The values will be the paths to all "domain", for example (['logger'], 'logger')
        # or (['sensor', 'ultrasonic'], 'sensor.ultrasonic')
        self.output_paths: list[tuple[ConfigPath, str]] = []
        # A list of components ids with the config path
        self.declare_ids: list[tuple[core.ID, ConfigPath]] = []
        self._data = {}
        # Store pending validation tasks (in heap order)
        self._validation_tasks: list[_ValidationStepTask] = []
        # ID to ensure stable order for keys with equal priority
        self._validation_tasks_id = 0

    def add_error(self, error: vol.Invalid) -> None:
        if isinstance(error, vol.MultipleInvalid):
            for err in error.errors:
                self.add_error(err)
            return
        if cv.ROOT_CONFIG_PATH in error.path:
            # Root value means that the path before the root should be ignored
            last_root = max(
                i for i, v in enumerate(error.path) if v is cv.ROOT_CONFIG_PATH
            )
            # can't change the path so re-create the error
            error = vol.Invalid(
                message=error.error_message,
                path=error.path[last_root + 1 :],
                error_type=error.error_type,
            )
        self.errors.append(error)

    def add_validation_step(self, step: ConfigValidationStep):
        id_num = self._validation_tasks_id
        self._validation_tasks_id += 1
        heapq.heappush(
            self._validation_tasks, _ValidationStepTask(step.priority, id_num, step)
        )

    def run_validation_steps(self):
        while self._validation_tasks and not self.errors:
            task = heapq.heappop(self._validation_tasks)
            task.step.run(self)

    @contextmanager
    def catch_error(self, path=None):
        path = path or []
        try:
            yield
        except cv.FinalExternalInvalid as e:
            self.add_error(e)
        except vol.Invalid as e:
            e.prepend(path)
            self.add_error(e)

    def add_str_error(self, message: str, path: ConfigPath) -> None:
        self.add_error(vol.Invalid(message, path))

    def add_output_path(self, path: ConfigPath, domain: str) -> None:
        self.output_paths.append((path, domain))

    def remove_output_path(self, path: ConfigPath, domain: str) -> None:
        self.output_paths.remove((path, domain))

    def is_in_error_path(self, path: ConfigPath) -> bool:
        for err in self.errors:
            if _path_begins_with(err.path, path):
                return True
        return False

    def set_by_path(self, path, value):
        conf = self
        for key in path[:-1]:
            conf = conf[key]
        conf[path[-1]] = value

    def get_error_for_path(self, path: ConfigPath) -> vol.Invalid | None:
        for err in self.errors:
            if self.get_deepest_path(err.path) == path:
                self.errors.remove(err)
                return err
        return None

    def get_deepest_document_range_for_path(
        self, path: ConfigPath, get_key: bool = False
    ) -> DocumentRange | None:
        data = self
        doc_range = None
        for index, path_item in enumerate(path):
            try:
                if path_item in data:
                    key_data = [x for x in data.keys() if x == path_item][0]
                    if isinstance(key_data, ESPHomeDataBase):
                        doc_range = key_data.esp_range
                        if get_key and index == len(path) - 1:
                            return doc_range
                data = data[path_item]
            except (KeyError, IndexError, TypeError, AttributeError):
                return doc_range
            if isinstance(data, core.ID):
                data = data.id
            if isinstance(data, ESPHomeDataBase) and data.esp_range is not None:
                doc_range = data.esp_range
            elif isinstance(data, dict):
                platform_item = data.get("platform")
                if (
                    isinstance(platform_item, ESPHomeDataBase)
                    and platform_item.esp_range is not None
                ):
                    doc_range = platform_item.esp_range

        return doc_range

    def get_nested_item(
        self, path: ConfigPath, raise_error: bool = False
    ) -> ConfigFragmentType:
        data = self
        for item_index in path:
            try:
                data = data[item_index]
            except (KeyError, IndexError, TypeError):
                if raise_error:
                    raise
                return {}
        return data

    def get_deepest_path(self, path: ConfigPath) -> ConfigPath:
        """Return the path that is the deepest reachable by following path."""
        data = self
        part = []
        for item_index in path:
            try:
                data = data[item_index]
            except (KeyError, IndexError, TypeError):
                return part
            part.append(item_index)
        return part

    def get_path_for_id(self, id: core.ID):
        """Return the config fragment where the given ID is declared."""
        for declared_id, path in self.declare_ids:
            if declared_id.id == str(id):
                return path
        raise KeyError(f"ID {id} not found in configuration")

    def get_config_for_path(self, path: ConfigPath) -> ConfigFragmentType:
        return self.get_nested_item(path, raise_error=True)

    @property
    def data(self):
        """Return temporary data used by final validation functions."""
        return self._data


def iter_ids(config, path=None):
    path = path or []
    if isinstance(config, core.ID):
        yield config, path
    elif isinstance(config, core.Lambda):
        for id in config.requires_ids:
            yield id, path
    elif isinstance(config, list):
        for i, item in enumerate(config):
            yield from iter_ids(item, path + [i])
    elif isinstance(config, dict):
        for key, value in config.items():
            if isinstance(key, core.ID):
                yield key, path
            yield from iter_ids(value, path + [key])


def recursive_check_replaceme(value):
    if isinstance(value, list):
        return cv.Schema([recursive_check_replaceme])(value)
    if isinstance(value, dict):
        return cv.Schema({cv.valid: recursive_check_replaceme})(value)
    if isinstance(value, ESPForceValue):
        pass
    if isinstance(value, str) and value == "REPLACEME":
        raise cv.Invalid(
            "Found 'REPLACEME' in configuration, this is most likely an error. "
            "Please make sure you have replaced all fields from the sample "
            "configuration.\n"
            "If you want to use the literal REPLACEME string, "
            'please use "!force REPLACEME"'
        )
    return value


class ConfigValidationStep(abc.ABC):
    """A step to for the validation phase."""

    # Priority of this step, higher means run earlier
    priority: float = 0.0

    @abc.abstractmethod
    def run(self, result: Config) -> None: ...  # noqa: E704


class LoadValidationStep(ConfigValidationStep):
    """Load step, this step is called once for each domain config fragment.

    Responsibilities:
    - Load component code
    - Ensure all AUTO_LOADs are added
    - Set output paths of result
    """

    def __init__(self, domain: str, conf: ConfigType):
        self.domain = domain
        self.conf = conf

    def run(self, result: Config) -> None:
        if self.domain.startswith("."):
            # Ignore top-level keys starting with a dot
            return
        result.add_output_path([self.domain], self.domain)
        component = get_component(self.domain)
        if (
            component is not None
            and component.multi_conf_no_default
            and isinstance(self.conf, core.AutoLoad)
        ):
            self.conf = []
        result[self.domain] = self.conf
        path = [self.domain]
        if component is None:
            result.add_str_error(f"Component not found: {self.domain}", path)
            return
        CORE.loaded_integrations.add(self.domain)

        # Process AUTO_LOAD
        for load in component.auto_load:
            if load not in result:
                result.add_validation_step(AutoLoadValidationStep(load))

        result.add_validation_step(
            MetadataValidationStep([self.domain], self.domain, self.conf, component)
        )

        if not component.is_platform_component:
            return

        # This is a platform component, proceed to reading platform entries
        # Remove this is as an output path
        result.remove_output_path([self.domain], self.domain)

        # Ensure conf is a list
        if not self.conf:
            result[self.domain] = self.conf = []
        elif not isinstance(self.conf, list):
            result[self.domain] = self.conf = [self.conf]

        for i, p_config in enumerate(self.conf):
            path = [self.domain, i]
            # Construct temporary unknown output path
            p_domain = f"{self.domain}.unknown"
            result.add_output_path(path, p_domain)
            result[self.domain][i] = p_config
            if not isinstance(p_config, dict):
                result.add_str_error("Platform schemas must be key-value pairs.", path)
                continue
            p_name = p_config.get("platform")
            if p_name is None:
                p_id = p_config.get(CONF_ID)
                if isinstance(p_id, Extend):
                    result.add_str_error(
                        f"Source for extension of ID '{p_id.value}' was not found.",
                        path + [CONF_ID],
                    )
                    continue
                if isinstance(p_id, Remove):
                    result.add_str_error(
                        f"Source for removal of ID '{p_id.value}' was not found.",
                        path + [CONF_ID],
                    )
                    continue
                result.add_str_error(
                    f"'{self.domain}' requires a 'platform' key but it was not specified.",
                    path,
                )
                continue
            # Remove temp output path
            result.remove_output_path(path, p_domain)

            # Process the platform configuration
            _process_platform_config(result, self.domain, p_name, p_config, path)


class AutoLoadValidationStep(ConfigValidationStep):
    """Auto load step. This step is used to automatically load components if
    a component requested that with AUTO_LOAD.
    """

    # Only load after all regular loads have taken place
    priority = -1.0

    def __init__(self, domain: str):
        self.domain = domain

    def run(self, result: Config) -> None:
        # Regular component auto-load (no platform)
        if "." not in self.domain:
            if self.domain in result:
                # already loaded
                return
            result.add_validation_step(LoadValidationStep(self.domain, core.AutoLoad()))
            return

        # Platform-specific auto-load (e.g., "ota.web_server")
        component_name, _, platform_name = self.domain.partition(".")

        # Check if component exists
        if component_name not in result:
            # Component doesn't exist, load it first
            result.add_validation_step(LoadValidationStep(component_name, []))
            # Re-run this step after the component is loaded
            result.add_validation_step(AutoLoadValidationStep(self.domain))
            return

        # Component exists, check if it's a platform component
        component = get_component(component_name)
        if component is None or not component.is_platform_component:
            result.add_str_error(
                f"Component {component_name} is not a platform component, "
                f"cannot auto-load platform {platform_name}",
                [component_name],
            )
            return

        # Ensure the component config is a list
        component_conf = result.get(component_name)
        if not isinstance(component_conf, list):
            component_conf = result[component_name] = []

        # Check if platform already exists
        if any(
            isinstance(conf, dict) and conf.get(CONF_PLATFORM) == platform_name
            for conf in component_conf
        ):
            return

        # Add and process the platform configuration
        platform_conf = core.AutoLoad()
        platform_conf[CONF_PLATFORM] = platform_name
        component_conf.append(platform_conf)

        path = [component_name, len(component_conf) - 1]
        _process_platform_config(
            result, component_name, platform_name, platform_conf, path
        )


class MetadataValidationStep(ConfigValidationStep):
    """Validate component metadata

    Responsibilties:
     - Config transformation (nullable, multi conf)
     - Check dependencies
     - Check conflicts
     - Check supported target platforms
    """

    # All components need to be loaded first to ensure dependency check works
    priority = -2.0

    def __init__(
        self,
        path: ConfigPath,
        domain: str,
        conf: ConfigType,
        component: ComponentManifest,
    ) -> None:
        self.path = path
        self.domain = domain
        self.conf = conf
        self.comp = component

    def run(self, result: Config) -> None:
        if self.conf is None:
            if self.comp.multi_conf and self.comp.multi_conf_no_default:
                result[self.domain] = self.conf = []
            else:
                result[self.domain] = self.conf = {}

        success = True
        for dependency in self.comp.dependencies:
            dependency_parts = dependency.split(".")
            if len(dependency_parts) > 2:
                result.add_str_error(
                    "Dependencies must be specified as a single component or in component.platform format only",
                    self.path,
                )
                return
            component_dep = dependency_parts[0]
            platform_dep = dependency_parts[-1]
            if component_dep not in result:
                result.add_str_error(
                    f"Component {self.domain} requires component {component_dep}",
                    self.path,
                )
                success = False
            elif component_dep != platform_dep and (
                not isinstance(platform_list := result.get(component_dep), list)
                or not any(CONF_PLATFORM in p for p in platform_list)
                or not any(p[CONF_PLATFORM] == platform_dep for p in platform_list)
            ):
                result.add_str_error(
                    f"Component {self.domain} requires 'platform: {platform_dep}' in component '{component_dep}'",
                    self.path,
                )
                success = False
        if not success:
            return

        success = True
        for conflict in self.comp.conflicts_with:
            if conflict in result:
                result.add_str_error(
                    f"Component {self.domain} cannot be used together with component {conflict}",
                    self.path,
                )
                success = False
        if not success:
            return

        if (
            not self.comp.is_platform_component
            and self.comp.config_schema is None
            and not isinstance(self.conf, core.AutoLoad)
        ):
            result.add_str_error(
                f"Component {self.domain} cannot be loaded via YAML "
                "(no CONFIG_SCHEMA).",
                self.path,
            )
            return

        if self.comp.multi_conf:
            if not isinstance(self.conf, list):
                result[self.domain] = self.conf = [self.conf]
            if (
                not isinstance(self.comp.multi_conf, bool)
                and len(self.conf) > self.comp.multi_conf
            ):
                result.add_str_error(
                    f"Component {self.domain} supports a maximum of {self.comp.multi_conf} "
                    f"entries ({len(self.conf)} found).",
                    self.path,
                )
                return
            for i, part_conf in enumerate(self.conf):
                result.add_validation_step(
                    SchemaValidationStep(
                        self.domain, self.path + [i], part_conf, self.comp
                    )
                )
            return

        result.add_validation_step(
            SchemaValidationStep(self.domain, self.path, self.conf, self.comp)
        )


class SchemaValidationStep(ConfigValidationStep):
    """Schema validation step.

    During this step all CONFIG_SCHEMAs are checked against the configs.
    """

    def __init__(
        self, domain: str, path: ConfigPath, conf: ConfigType, comp: ComponentManifest
    ):
        self.path = path
        self.conf = conf
        self.comp = comp

    def run(self, result: Config) -> None:
        token = path_context.set(self.path)
        with result.catch_error(self.path):
            if self.comp.is_platform:
                # Remove 'platform' key for validation
                input_conf = OrderedDict(self.conf)
                platform_val = input_conf.pop("platform")
                schema = cv.Schema(self.comp.config_schema)
                validated = schema(input_conf)
                # Ensure result is OrderedDict so we can call move_to_end
                if not isinstance(validated, OrderedDict):
                    validated = OrderedDict(validated)
                validated["platform"] = platform_val
                validated.move_to_end("platform", last=False)
                result.set_by_path(self.path, validated)
            elif self.comp.config_schema is not None:
                schema = cv.Schema(self.comp.config_schema)
                validated = schema(self.conf)
                result.set_by_path(self.path, validated)

        path_context.reset(token)
        result.add_validation_step(FinalValidateValidationStep(self.path, self.comp))


class IDPassValidationStep(ConfigValidationStep):
    """ID Pass step.

    During this step all ID references are checked.

    If an automatic ID reference is used, a fitting declared ID is automatically searched.
    Also checks duplicate ID names, and that referenced IDs are declared.
    """

    # Has to happen after all schemas validated
    priority = -10.0

    def __init__(self) -> None:
        pass

    def run(self, result: Config) -> None:
        from esphome.cpp_generator import MockObjClass
        from esphome.cpp_types import Component

        if result.errors:
            # If result already has errors, skip this step
            # Otherwise the user will get a bunch of missing ID warnings
            # because the component that did not validate doesn't have any IDs set
            return

        searching_ids: list[tuple[core.ID, ConfigPath]] = []
        for id, path in iter_ids(result):
            if id.is_declaration:
                if id.id is not None:
                    # Look for duplicate definitions
                    match = next(
                        (v for v in result.declare_ids if v[0].id == id.id), None
                    )
                    if match is not None:
                        opath = "->".join(str(v) for v in match[1])
                        result.add_str_error(
                            f"ID {id.id} redefined! Check {opath}", path
                        )
                        continue
                result.declare_ids.append((id, path))
            else:
                searching_ids.append((id, path))

        # Resolve default ids after manual IDs
        for id, _ in result.declare_ids:
            id.resolve([v[0].id for v in result.declare_ids])
            if isinstance(id.type, MockObjClass) and id.type.inherits_from(Component):
                CORE.component_ids.add(id.id)

        # Check searched IDs
        for id, path in searching_ids:
            if id.id is not None:
                # manually declared
                match = next(
                    (v[0] for v in result.declare_ids if v[0].id == id.id), None
                )
                if match is None or not match.is_manual:
                    # No declared ID with this name
                    import difflib

                    error = (
                        f"Couldn't find ID '{id.id}'. Please check you have defined "
                        "an ID with that name in your configuration."
                    )
                    # Find candidates
                    matches = difflib.get_close_matches(
                        id.id, [v[0].id for v in result.declare_ids if v[0].is_manual]
                    )
                    if matches:
                        matches_s = ", ".join(f'"{x}"' for x in matches)
                        error += f" These IDs look similar: {matches_s}."
                    result.add_str_error(error, path)
                    continue
                if not isinstance(match.type, MockObjClass) or not isinstance(
                    id.type, MockObjClass
                ):
                    continue
                if not match.type.inherits_from(id.type):
                    result.add_str_error(
                        f"ID '{id.id}' of type {match.type} doesn't inherit from {id.type}. "
                        "Please double check your ID is pointing to the correct value",
                        path,
                    )

            if id.id is None and id.type is not None:
                matches = []
                for v in result.declare_ids:
                    if v[0] is None or not isinstance(v[0].type, MockObjClass):
                        continue
                    inherits = v[0].type.inherits_from(id.type)
                    if inherits:
                        matches.append(v[0])

                if len(matches) == 0:
                    result.add_str_error(
                        f"Couldn't find any component that can be used for '{id.type}'. Are you missing a hub declaration?",
                        path,
                    )
                elif len(matches) == 1:
                    id.id = matches[0].id
                elif len(matches) > 1:
                    if str(id.type) == "time::RealTimeClock":
                        id.id = matches[0].id
                    else:
                        manual_declared_count = sum(1 for m in matches if m.is_manual)
                        if manual_declared_count > 0:
                            ids = ", ".join(
                                [f"'{m.id}'" for m in matches if m.is_manual]
                            )
                            result.add_str_error(
                                f"Too many candidates found for '{path[-1]}' type '{id.type}' {'Some are' if manual_declared_count > 1 else 'One is'} {ids}",
                                path,
                            )
                        else:
                            result.add_str_error(
                                f"Too many candidates found for '{path[-1]}' type '{id.type}' You must assign an explicit ID to the parent component you want to use.",
                                path,
                            )


class RemoveReferenceValidationStep(ConfigValidationStep):
    """
    Make sure all !remove references have been removed from the config.
    Any left overs mean the merge step couldn't find corresponding previously existing id/key
    """

    def run(self, result: Config) -> None:
        if result.errors:
            # If result already has errors, skip this step
            return

        def recursive_check_remove_tag(config: Config, path: ConfigPath = None):
            path = path or []

            if isinstance(config, Remove):
                result.add_str_error(
                    f"Source for removal at '{'->'.join([str(p) for p in path])}' was not found.",
                    path,
                )
            elif isinstance(config, list):
                for i, item in enumerate(config):
                    recursive_check_remove_tag(item, path + [i])
            elif isinstance(config, dict):
                for key, value in config.items():
                    recursive_check_remove_tag(value, path + [key])

        recursive_check_remove_tag(result)


class FinalValidateValidationStep(ConfigValidationStep):
    """Run final_validate_schema for all components."""

    # Has to happen after ID pass validated
    priority = -20.0

    def __init__(self, path: ConfigPath, comp: ComponentManifest) -> None:
        self.path = path
        self.comp = comp

    def run(self, result: Config) -> None:
        if result.errors:
            # If result already has errors, skip this step
            return

        token = fv.full_config.set(result)

        conf = result.get_nested_item(self.path)
        with result.catch_error(self.path):
            if self.comp.final_validate_schema is not None:
                self.comp.final_validate_schema(conf)

        fv.full_config.reset(token)


class PinUseValidationCheck(ConfigValidationStep):
    """Check for pin reuse"""

    priority = -30  # Should happen after component final validations

    def __init__(self) -> None:
        pass

    def run(self, result: Config) -> None:
        if result.errors:
            # If result already has errors, skip this step
            return
        pins.PIN_SCHEMA_REGISTRY.final_validate(result)


def validate_config(
    config: dict[str, Any], command_line_substitutions: dict[str, Any]
) -> Config:
    result = Config()

    loader.clear_component_meta_finders()
    loader.install_custom_components_meta_finder()

    # 0. Load packages
    if CONF_PACKAGES in config:
        from esphome.components.packages import do_packages_pass

        result.add_output_path([CONF_PACKAGES], CONF_PACKAGES)
        try:
            config = do_packages_pass(config)
        except vol.Invalid as err:
            result.update(config)
            result.add_error(err)
            return result

    CORE.raw_config = config

    # 1. Load substitutions
    if CONF_SUBSTITUTIONS in config or command_line_substitutions:
        from esphome.components import substitutions

        result[CONF_SUBSTITUTIONS] = {
            **(config.get(CONF_SUBSTITUTIONS) or {}),
            **command_line_substitutions,
        }
        result.add_output_path([CONF_SUBSTITUTIONS], CONF_SUBSTITUTIONS)
        try:
            substitutions.do_substitution_pass(config, command_line_substitutions)
        except vol.Invalid as err:
            result.add_error(err)
            return result

    CORE.raw_config = config

    # 1.1. Check for REPLACEME special value
    try:
        recursive_check_replaceme(config)
    except vol.Invalid as err:
        result.add_error(err)

    # 1.2. Load external_components
    if CONF_EXTERNAL_COMPONENTS in config:
        from esphome.components.external_components import do_external_components_pass

        result.add_output_path([CONF_EXTERNAL_COMPONENTS], CONF_EXTERNAL_COMPONENTS)
        try:
            do_external_components_pass(config)
        except vol.Invalid as err:
            result.update(config)
            result.add_error(err)
            return result

    if "esphomeyaml" in config:
        _LOGGER.warning(
            "The esphomeyaml section has been renamed to esphome in 1.11.0. "
            "Please replace 'esphomeyaml:' in your configuration with 'esphome:'."
        )
        config[CONF_ESPHOME] = config.pop("esphomeyaml")

    if CONF_ESPHOME not in config:
        result.add_str_error(
            "'esphome' section missing from configuration. Please make sure "
            "your configuration has an 'esphome:' line in it.",
            [],
        )
        return result

    # 2. Load partial core config
    result[CONF_ESPHOME] = config[CONF_ESPHOME]
    result.add_output_path([CONF_ESPHOME], CONF_ESPHOME)
    try:
        target_platform = core_config.preload_core_config(config, result)
    except vol.Invalid as err:
        result.add_error(err)
        return result
    # Remove temporary esphome config path again, it will be reloaded later
    result.remove_output_path([CONF_ESPHOME], CONF_ESPHOME)

    # Check version number now to avoid loading components that are not supported
    if min_version := config[CONF_ESPHOME].get(CONF_MIN_VERSION):
        cv.All(cv.version_number, cv.validate_esphome_version)(min_version)

    # First run platform validation steps
    result.add_validation_step(
        LoadValidationStep(target_platform, config[target_platform])
    )
    result.run_validation_steps()

    if result.errors:
        # do not try to validate further as we don't know what the target is
        return result

    for domain, conf in config.items():
        result.add_validation_step(LoadValidationStep(domain, conf))
    result.add_validation_step(IDPassValidationStep())
    result.add_validation_step(PinUseValidationCheck())

    result.add_validation_step(RemoveReferenceValidationStep())

    result.run_validation_steps()

    return result


def humanize_error(config, validation_error):
    validation_error = str(validation_error)
    m = re.match(
        r"^(.*?)\s*(?:for dictionary value )?@ data\[.*$", validation_error, re.DOTALL
    )
    if m is not None:
        validation_error = m.group(1)
    validation_error = validation_error.strip()
    if not validation_error.endswith("."):
        validation_error += "."
    return validation_error


def _get_parent_name(path, config):
    if not path:
        return "<root>"
    for domain_path, domain in config.output_paths:
        if _path_begins_with(path, domain_path):
            if len(path) > len(domain_path):
                # Sub-item
                break
            return domain
    # When processing a list, skip back over the index
    while len(path) > 1 and isinstance(path[-1], int):
        path = path[:-1]
    return path[-1]


def _format_vol_invalid(ex: vol.Invalid, config: Config) -> str:
    message = ""

    paren = _get_parent_name(ex.path[:-1], config)

    if isinstance(ex, ExtraKeysInvalid):
        if ex.candidates:
            message += f"[{ex.path[-1]}] is an invalid option for [{paren}]. Did you mean {', '.join(f'[{x}]' for x in ex.candidates)}?"
        else:
            message += f"[{ex.path[-1]}] is an invalid option for [{paren}]. Please check the indentation."
    elif "extra keys not allowed" in str(ex):
        message += f"[{ex.path[-1]}] is an invalid option for [{paren}]."
    elif isinstance(ex, vol.RequiredFieldInvalid):
        if ex.msg == "required key not provided":
            message += f"'{ex.path[-1]}' is a required option for [{paren}]."
        else:
            # Required has set a custom error message
            message += ex.msg
    else:
        message += humanize_error(config, ex)

    return message


class InvalidYAMLError(EsphomeError):
    def __init__(self, base_exc):
        try:
            base = str(base_exc)
        except UnicodeDecodeError:
            base = repr(base_exc)
        message = f"Invalid YAML syntax:\n\n{base}"
        super().__init__(message)
        self.base_exc = base_exc


def _load_config(command_line_substitutions: dict[str, Any]) -> Config:
    """Load the configuration file."""
    try:
        config = yaml_util.load_yaml(CORE.config_path)
    except EsphomeError as e:
        raise InvalidYAMLError(e) from e

    try:
        return validate_config(config, command_line_substitutions)
    except EsphomeError:
        raise
    except Exception:
        _LOGGER.error("Unexpected exception while reading configuration:")
        raise


def load_config(command_line_substitutions: dict[str, Any]) -> Config:
    try:
        return _load_config(command_line_substitutions)
    except vol.Invalid as err:
        raise EsphomeError(f"Error while parsing config: {err}") from err


def line_info(config, path, highlight=True):
    """Display line config source."""
    if not highlight:
        return None
    obj = config.get_deepest_document_range_for_path(path)
    if obj:
        mark = obj.start_mark
        source = f"[source {mark.document}:{mark.line + 1}]"
        return color(AnsiFore.CYAN, source)
    return "None"


def _print_on_next_line(obj):
    if isinstance(obj, (list, tuple, dict)):
        return True
    if isinstance(obj, str):
        return len(obj) > 80
    if isinstance(obj, core.Lambda):
        return len(obj.value) > 80
    return False


def dump_dict(
    config: Config, path: ConfigPath, at_root: bool = True
) -> tuple[str, bool]:
    conf = config.get_nested_item(path)
    ret = ""
    multiline = False

    if at_root:
        error = config.get_error_for_path(path)
        if error is not None:
            ret += f"\n{color(AnsiFore.BOLD_RED, _format_vol_invalid(error, config))}\n"

    if isinstance(conf, (list, tuple)):
        multiline = True
        if not conf:
            ret += "[]"
            multiline = False

        for i in range(len(conf)):
            path_ = path + [i]
            error = config.get_error_for_path(path_)
            if error is not None:
                ret += f"\n{color(AnsiFore.BOLD_RED, _format_vol_invalid(error, config))}\n"

            sep = "- "
            if config.is_in_error_path(path_):
                sep = color(AnsiFore.RED, sep)
            msg, _ = dump_dict(config, path_, at_root=False)
            msg = indent(msg)
            inf = line_info(config, path_, highlight=config.is_in_error_path(path_))
            if inf is not None:
                msg = f"{inf}\n{msg}"
            elif msg:
                msg = msg[2:]
            ret += f"{sep + msg}\n"
    elif isinstance(conf, dict):
        multiline = True
        if not conf:
            ret += "{}"
            multiline = False

        for k in conf.keys():
            path_ = path + [k]
            error = config.get_error_for_path(path_)
            if error is not None:
                ret += f"\n{color(AnsiFore.BOLD_RED, _format_vol_invalid(error, config))}\n"

            st = f"{k}: "
            if config.is_in_error_path(path_):
                st = color(AnsiFore.RED, st)
            msg, m = dump_dict(config, path_, at_root=False)

            inf = line_info(config, path_, highlight=config.is_in_error_path(path_))
            if m:
                msg = f"\n{indent(msg)}"

            if inf is not None:
                if m:
                    msg = f" {inf}{msg}"
                else:
                    msg = f"{msg} {inf}"
            ret += f"{st + msg}\n"
    elif isinstance(conf, str):
        if is_secret(conf):
            conf = f"!secret {is_secret(conf)}"
        if not conf:
            conf += "''"

        if len(conf) > 80:
            conf = f"|-\n{indent(conf)}"
        error = config.get_error_for_path(path)
        col = AnsiFore.BOLD_RED if error else AnsiFore.KEEP
        ret += color(col, str(conf))
    elif isinstance(conf, core.Lambda):
        if is_secret(conf):
            conf = f"!secret {is_secret(conf)}"

        conf = f"!lambda |-\n{indent(str(conf.value))}"
        error = config.get_error_for_path(path)
        col = AnsiFore.BOLD_RED if error else AnsiFore.KEEP
        ret += color(col, conf)
    elif conf is None:
        pass
    else:
        error = config.get_error_for_path(path)
        col = AnsiFore.BOLD_RED if error else AnsiFore.KEEP
        ret += color(col, str(conf))
        multiline = "\n" in ret

    return ret, multiline


def strip_default_ids(config):
    if isinstance(config, list):
        to_remove = []
        for i, x in enumerate(config):
            x = config[i] = strip_default_ids(x)
            if (isinstance(x, core.ID) and not x.is_manual) or isinstance(
                x, core.AutoLoad
            ):
                to_remove.append(x)
        for x in to_remove:
            config.remove(x)
    elif isinstance(config, dict):
        to_remove = []
        for k, v in config.items():
            v = config[k] = strip_default_ids(v)
            if (isinstance(v, core.ID) and not v.is_manual) or isinstance(
                v, core.AutoLoad
            ):
                to_remove.append(k)
        for k in to_remove:
            config.pop(k)
    return config


def read_config(command_line_substitutions):
    _LOGGER.info("Reading configuration %s...", CORE.config_path)
    try:
        res = load_config(command_line_substitutions)
    except EsphomeError as err:
        _LOGGER.error("Error while reading config: %s", err)
        return None
    if res.errors:
        if not CORE.verbose:
            res = strip_default_ids(res)

        safe_print(color(AnsiFore.BOLD_RED, "Failed config"))
        safe_print("")
        for path, domain in res.output_paths:
            if not res.is_in_error_path(path):
                continue

            errstr = color(AnsiFore.BOLD_RED, f"{domain}:")
            errline = line_info(res, path)
            if errline:
                errstr += f" {errline}"
            safe_print(errstr)
            split_dump = dump_dict(res, path)[0].splitlines()
            # find the last error message
            i = len(split_dump) - 1
            while i > 10 and "\033[" not in split_dump[i]:
                i = i - 1
            # discard lines more than 4 beyond the last error
            i = min(i + 4, len(split_dump))
            safe_print(indent("\n".join(split_dump[:i])))

        for err in res.errors:
            safe_print(color(AnsiFore.BOLD_RED, err.msg))
            safe_print("")

        return None
    return res
